home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Cream of the Crop 26
/
Cream of the Crop 26.iso
/
os2
/
pvm34b3.zip
/
pvm34b3
/
pvm3
/
src
/
lpvmmimd.c
< prev
next >
Wrap
C/C++ Source or Header
|
1997-07-22
|
50KB
|
2,013 lines
static char rcsid[] =
"$Id: lpvmmimd.c,v 1.16 1997/06/27 17:32:27 pvmsrc Exp $";
/*
* PVM version 3.4: Parallel Virtual Machine System
* University of Tennessee, Knoxville TN.
* Oak Ridge National Laboratory, Oak Ridge TN.
* Emory University, Atlanta GA.
* Authors: J. J. Dongarra, G. E. Fagg, M. Fischer
* G. A. Geist, J. A. Kohl, R. J. Manchek, P. Mucci,
* P. M. Papadopoulos, S. L. Scott, and V. S. Sunderam
* (C) 1997 All Rights Reserved
*
* NOTICE
*
* Permission to use, copy, modify, and distribute this software and
* its documentation for any purpose and without fee is hereby granted
* provided that the above copyright notice appear in all copies and
* that both the copyright notice and this permission notice appear in
* supporting documentation.
*
* Neither the Institutions (Emory University, Oak Ridge National
* Laboratory, and University of Tennessee) nor the Authors make any
* representations about the suitability of this software for any
* purpose. This software is provided ``as is'' without express or
* implied warranty.
*
* PVM version 3 was funded in part by the U.S. Department of Energy,
* the National Science Foundation and the State of Tennessee.
*/
/*
* lpvmmimd.c
*
* Libpvm core for MPP environment.
*
$Log: lpvmmimd.c,v $
* Revision 1.16 1997/06/27 17:32:27 pvmsrc
* Updated for WIN32 header files & Authors.
*
* Revision 1.15 1997/06/12 20:10:44 pvmsrc
* Made sure all communications for TC_* task control messages
* use the SYSCTX_TC system context.
* - some messages being sent in default context... D-Oh...
*
* Revision 1.14 1997/05/29 15:13:35 pvmsrc
* Removed static decls:
* - pvmtrcsbf, pvmrouteopt now in lpvmglob.c / lpvm.h.
* - pvmtrcmid doesn't exist.
*
* Revision 1.13 1997/04/30 21:26:05 pvmsrc
* SGI Compiler Warning Cleanup.
*
* Revision 1.12 1997/04/07 21:09:12 pvmsrc
* pvm_addmhf() - new paramter interface
*
* Revision 1.11 1997/04/01 21:28:16 pvmsrc
* Damn Damn Damn.
* - pvm_recvinfo() returns a bufid, not an index. Damn.
*
* Revision 1.10 1997/04/01 20:48:19 pvmsrc
* Fixed tracer mbox usage:
* - pvm_getinfo() -> pvm_recvinfo(), new semantics handled (recvinfo
* sets rbuf implicitly, a la pvm_recv, need to save rbuf).
*
* Revision 1.9 1997/03/27 19:55:29 pvmsrc
* Fixed up pvmbeatask() to go get tracer info if spawned from shell:
* - env var info including trace mask, trace buffer size, trace opts.
* - use PVMTRACERCLASS mbox entry to fill in values, if matches
* on trctid, trcctx, and trctag.
*
* Revision 1.8 1997/03/06 21:50:19 pvmsrc
* Yanked out #includes for <netinet/in.h> and <netinet/tcp.h>.
* - dups with lpvm.h #includes...
*
* Revision 1.7 1997/01/28 19:26:29 pvmsrc
* New Copyright Notice & Authors.
*
* Revision 1.6 1996/12/19 20:17:04 pvmsrc
* Replaced old struct umbuf with new struct pmsg.
*
* Revision 1.5 1996/12/19 19:57:56 pvmsrc
* Eradicated remainder of old control message interface.
* - replaced pvmmctl() routine with individual messages handlers:
* * pvm_tc_shmat() (lpvmshmem.c only).
* * pvm_tc_conreq().
* * pvm_tc_conack().
* * pvm_tc_taskexit().
* - added appropriate calls to pvm_addmhf() in pvmbeatask().
* - removed calls to pvmmctl() in mroute() (peer_recv() in
* lpvmshmem.c), replaced with new mesg_input() call,
* use new pmsg_setenc() routine to set message encoding.
*
* Revision 1.4 1996/12/18 22:27:48 pvmsrc
* Extracted duplicate versions of routines from lpvm/mimd/shmem.c,
* inserted into shared lpvmgen.c:
* - pvmbailout().
* - pvmlogerror().
* - vpvmlogprintf(), pvmlogprintf(). (hope these work on MPP & shmem)
* - pvmlogperror().
*
* Revision 1.3 1996/10/25 13:57:27 pvmsrc
* Replaced old #includes for protocol headers:
* - <pvmsdpro.h>, "ddpro.h", "tdpro.h"
* With #include of new combined header:
* - <pvmproto.h>
*
* Revision 1.2 1996/10/24 22:44:33 pvmsrc
* Modified for New Tracing Facility:
* - moved #include "global.h" below other #include's for typing.
* - removed extra #include <pvm3.h> in lpvm.c...
* - added #include of new "lpvm.h" to replace explicit externs.
* - removed common control message handlers from lpvm.c:
* * extracted to lpvmgen.c for general usage.
* * pvm_tc_noop(), pvm_tc_settmask(), pvm_tc_siblings().
* -> lpvmmimd.c & lpvmshmem.c still need remainder of pvmmctl()
* replaced with control message handlers.
* - arg typing hassles with int_compare() / qsort() exacerbated...
* - modified pvmbeatask():
* * handle new tracing info, unpack tracing and output collection
* parameters into temp storage, and then check for local task
* override before applying.
* * read in new tracing env vars PVMTRCBUF & PVMTRCOPT.
* * install new common message handlers.
* * call new tev_init() routine to set up tracing stuff.
* * use new Pvmtracer structures (pvmtrc & pvmctrc) to store info.
* - removed pvm_getopt() & pvm_setopt() -> moved to common lpvmgen.c.
* - removed old tev_begin(), tev_fin() & tev_do_trace() routines.
* - updated trace event generation for pvm_getfds(), pvm_start_pvmd(),
* pvm_precv(), pvm_psend().
*
* Revision 1.1 1996/09/23 23:44:17 pvmsrc
* Initial revision
*
* Revision 1.20 1995/11/02 16:12:23 manchek
* free replies to control messages in mxfer
*
* Revision 1.19 1995/11/02 16:11:15 manchek
* removed hdump()
*
* Revision 1.18 1995/09/06 17:37:25 manchek
* aargh, forgot pvm_precv
*
* Revision 1.17 1995/09/06 17:32:39 manchek
* pvm_psend returns not implemented instead of bad param for string type
*
* Revision 1.16 1995/09/05 19:16:11 manchek
* changed some comments
*
* Revision 1.15 1995/07/28 16:40:58 manchek
* wrap HASERRORVARS around errno declarations
*
* Revision 1.14 1995/07/18 19:03:35 manchek
* added code to generate and check crc on each message (MCHECKSUM)
*
* Revision 1.13 1995/06/28 18:19:09 manchek
* do-nothing check_for_exit so one can be in lpvmshmem.c
*
* Revision 1.12 1995/06/19 17:49:22 manchek
* was packing random string in TC_CONACK message in pvmmctl
*
* Revision 1.11 1995/06/16 16:07:54 manchek
* set debug mask and trace mask from environment vars.
* hack to set trace and output sink and tag for PGON
*
* Revision 1.10 1995/06/12 15:58:52 manchek
* added PGON partition size support
*
* Revision 1.9 1995/05/30 17:29:52 manchek
* Added ifdefs for SP2MPI architecture.
* Fixed bug in pvm_precv.
* Use asyncsend and probe for incoming messages in pvm_psend.
* Prefix my_node, etc. with "pvm" and make them static to avoid name clashes.
* Fix in mroute() to handle null message
*
* Revision 1.8 1995/02/01 21:11:45 manchek
* error 4 is now PvmOverflow
*
* Revision 1.7 1994/12/20 16:39:05 manchek
* added pvmshowtaskid variable
*
* Revision 1.6 1994/11/07 22:39:59 manchek
* Modify node_mcast() to handle DataInPlace correctly.
* Modify pvm_precv() to deal with packets from service nodes. Those
* packets have PVM headers.
* Change the limit on pvmfrgsiz in pvm_setopt() to MAXFRAGSIZE.
* Respond to RouteDirect requests
*
* Revision 1.5 1994/09/02 15:25:03 manchek
* fixed typos in pvm_precv - rtid should be rlen
*
* Revision 1.4 1994/06/03 20:38:17 manchek
* version 3.3.0
*
* Revision 1.3 1993/12/20 15:39:15 manchek
* patch 6 from wcj
*
* Revision 1.1 1993/08/30 23:26:48 manchek
* Initial revision
*
*/
#include <stdio.h>
#include <rpc/types.h>
#include <rpc/xdr.h>
#include <sys/stat.h>
#ifdef IMA_CM5
#include <unistd.h>
#endif
#include <fcntl.h>
#ifdef SYSVSTR
#include <string.h>
#else
#include <strings.h>
#endif
#include <errno.h>
#ifdef IMA_PGON
#include <nx.h>
#endif
#ifdef IMA_I860
#include <cube.h>
#endif
#ifdef IMA_SP2MPI
#include <sys/socket.h>
#include <sys/select.h>
#include "mpi.h"
#endif
#include <pvm3.h>
#include <pvmproto.h>
#include "pvmalloc.h"
#include "pvmfrag.h"
#include "pmsg.h"
#include "listmac.h"
#include "pvmdmp.h"
#include "pvmmimd.h"
#include "bfunc.h"
#include "lpvm.h"
#include <pvmtev.h>
#include "tevmac.h"
#include "global.h"
#ifndef max
#define max(a,b) ((a)>(b)?(a):(b))
#endif
char *getenv();
void hex_inadport __ProtoGlarp__ (( char *, struct sockaddr_in * ));
extern struct encvec *enctovec();
/***************
** Globals **
** **
***************/
#ifndef HASERRORVARS
extern int errno; /* from libc */
extern char *sys_errlist[];
extern int sys_nerr;
#endif
int pvmtidhmask = TIDHOST; /* mask - host field of tids */
int pvmtidnmask = TIDNODE; /* mask - node field of tids */
int pvmtidtmask = TIDPTYPE; /* mask - proc type field of tids */
int pvmpgonpartsize = 0; /* pgon parent partition size */
/***************
** Private **
** **
***************/
static int pvmdebmask = 0; /* which debugging info */
static char pvmtxt[512]; /* scratch for error log */
static struct pmsg *rxfrag = 0; /* not-assembled incm msgs */
static int mpierrcode = 0; /* error code returned by MPI calls */
static struct tmpfrag outfrags[NUMSMHD];/* fragments queued by async send */
static int nextsmhd = 0; /* index of current isend mhdl */
#if defined(IMA_CM5) || defined(IMA_SP2MPI)
static struct pmsg *precvlist = 0; /* not-processed incm msgs */
#endif
static long pvmmyptype = 0; /* my process type */
static long pvmhostnode = -1; /* host node number */
static int pvmmynode = -1; /* my node number */
#ifdef IMA_I860
static int pvmmyvnode = -1; /* my virtual node number */
#endif
static int mysetpart = 0; /* host field & set field */
/* static int recvmask = 0x80000000 + (1<<PMTDN) + (1<<PMTNN) + (1<<PMTMCAST);*/
/**************************
** Internal Functions **
** **
**************************/
/* pvm_tc_conreq()
*
* Another task requests a connection with us.
* Reply with a TC_CONACK message.
*
* TC_CONREQ() {
* int tdprotocol // t-d protocol revision number
* string sockaddr // address of other socket
* }
*/
static int
pvm_tc_conreq(mid)
int mid;
{
int src; /* sender of request */
int rbf; /* temp rx message storage */
int sbf = 0; /* return message to send */
int ttpro; /* protocol revision */
int ackd; /* allow connection if 0 */
int i;
int ictx;
rbf = pvm_setrbuf(mid);
pvm_bufinfo(mid, (int *)0, (int *)0, &src);
pvmlogprintf("pvm_tc_conreq() TCP conn request from t%x!\n", src);
sbf = pvm_setsbuf(pvm_mkbuf(PvmDataFoo));
ttpro = TDPROTOCOL;
ackd = 1;
pvm_pkint(&ttpro, 1, 1);
pvm_pkint(&ackd, 1, 1);
pvm_pkstr("");
i = pvmrescode;
pvmrescode = 1;
ictx = pvm_setcontext(SYSCTX_TC);
pvm_send(src, TC_CONACK);
pvm_setcontext(ictx);
pvmrescode = i;
pvm_freebuf(pvm_setsbuf(sbf));
pvm_setrbuf(rbf);
pvm_freebuf(mid);
return 0;
}
/* pvm_tc_conack()
*
* Another task replies to our connection request.
*
* TC_CONACK() {
* int tdprotocol // t-d protocol revision number
* int ack // 0 ok, 1 denied
* string sockaddr // address of other socket
* }
*/
static int
pvm_tc_conack(mid)
int mid;
{
int src; /* sender of reply */
int rbf; /* temp rx message storage */
rbf = pvm_setrbuf(mid);
pvm_bufinfo(mid, (int *)0, (int *)0, &src);
pvmlogprintf("pvm_tc_conack() unexpected TC msg from t%x!\n", src);
pvm_setrbuf(rbf);
pvm_freebuf(mid);
return 0;
}
/* pvm_tc_taskexit()
*
* We are notified that another task (to which we have a direct route)
* has exited.
*/
static int
pvm_tc_taskexit(mid)
int mid;
{
int src; /* sender of notify */
int rbf; /* temp rx message storage */
rbf = pvm_setrbuf(mid);
pvm_bufinfo(mid, (int *)0, (int *)0, &src);
pvmlogprintf("pvm_tc_taskexit() unexpected TC msg from t%x!\n",
src);
pvm_setrbuf(rbf);
pvm_freebuf(mid);
return 0;
}
/* mroute()
*
* Route a message to a destination.
* Returns when
* outgoing message (if any) fully sent and
* (timed out (tmout) or
* at least one message fully received)
* Returns >=0 the number of complete messages downloaded, or
* negative on error.
*/
int
mroute(mid, dtid, code, tmout)
int mid; /* message */
int dtid; /* dest */
int code; /* type code */
struct timeval *tmout; /* get at least one message */
{
struct pmsg *txup; /* tx message or null */
struct frag *txfp = 0; /* cur tx frag or null */
int gotem = 0; /* count complete msgs downloaded */
struct pmsg *rxup; /* rx message */
struct frag *fp;
char *cp = 0;
int src;
int dst;
int ff;
int block; /* get at least one message */
struct timeval tnow, tstop;
int len;
long node;
#ifdef IMA_SP2MPI
MPI_Status info;
#else
long info[8]; /* info about pending message */
#endif
#ifdef IMA_SP2MPI
int mpiflag = 0;
int mpisiz;
#endif
/* XXX do we really have to do this? */
if ((dtid == TIDPVMD && code == TM_MCA) || dtid == TIDGID)
return node_mcast(mid, dtid, code);
if (tmout) {
if (tmout->tv_sec || tmout->tv_usec) {
pvmgetclock(&tnow);
tstop.tv_sec = tnow.tv_sec + tmout->tv_sec;
tstop.tv_usec = tnow.tv_usec + tmout->tv_usec;
block = 1;
} else
block = 0;
} else {
block = 1;
tstop.tv_sec = -1;
tstop.tv_usec = -1;
}
if (txup = midtobuf(mid)) {
txfp = txup->m_frag->fr_link;
if (!txfp->fr_buf) {
txfp = fr_new(MAXHDR);
txfp->fr_dat += MAXHDR;
LISTPUTBEFORE(txup->m_frag, txfp, fr_link, fr_rlink);
}
node_send(txup, txfp, dtid, code);
if (!block)
return 0;
}
do {
if (block && tstop.tv_sec != -1) {
pvmgetclock(&tnow);
if (tnow.tv_sec > tstop.tv_sec
|| (tnow.tv_sec == tstop.tv_sec && tnow.tv_usec >= tstop.tv_usec))
break;
}
/*
* Receive PMTPACK msg only; psent msg can have any (user-supplied)
* type not equal to PMTPACK. They're handled by precv.
*/
node = MPPANYNODE;
if (MSGPROBE(node, PMTPACK, info)) {
/*
* 1. Null msg (except I860) from pvmd, followed by the real msg.
* 2. Header (inplace), followed by msg body.
* 3. Entire msg from another node.
*/
if (len = MSGSIZE(info)) {
fp = fr_new(len);
cp = fp->fr_dat;
}
if ((mpierrcode = PVMCRECV(node, PMTPACK, cp, len, 0, info))
#if defined(IMA_PGON) || defined(IMA_CM5) || defined(IMA_I860)
&& (mpierrcode < 0)
#endif
) {
pvmlogperror("mroute() PVMCRECV");
return PvmSysErr;
}
if (!len) {
/*
* Discard null msg. Now we must wait for the real (packed)
* msg, otherwise it could get picked up later by precv and
* cause problems.
*/
block = 1;
continue;
}
if ((len = pvmget32(cp + 8) + TDFRAGHDR) != fp->fr_max) {
struct frag *hdr; /* buffer for header */
/* inplace data, head & body sent separately */
hdr = fp;
fp = fr_new(len);
len = hdr->fr_max;
BCOPY(hdr->fr_dat, fp->fr_dat, len);
fr_unref(hdr);
node = MSGSENDER(info);
#ifdef IMA_I860
/* msg type is (virtual) node # */
if (_crecv(node, fp->fr_dat + len, fp->fr_max - len) < 0) {
#else
if ((mpierrcode = PVMCRECV(node, PMTPACK, fp->fr_dat + len,
fp->fr_max - len, 0, info))
#if defined(IMA_PGON) || defined(IMA_CM5)
&& (mpierrcode < 0)
#endif
) {
#endif
pvmlogperror("mroute() PVMCRECV msg body");
return PvmSysErr;
}
cp = fp->fr_dat;
}
/* fp->fr_len = fp->fr_max; */
dst = pvmget32(cp);
src = pvmget32(cp + 4);
fp->fr_len = pvmget32(cp + 8);
ff = pvmget8(cp + 12);
/* fp->fr_len -= TDFRAGHDR; */
fp->fr_dat += TDFRAGHDR;
if (pvmdebmask & PDMPACKET) {
sprintf(pvmtxt, "mroute() src t%x len %d dst t%x\n",
src, fp->fr_len, dst);
pvmlogerror(pvmtxt);
}
/*
* if start of message, make new umbuf, add to frag pile
*/
if (ff & FFSOM) {
cp += TDFRAGHDR;
fp->fr_len -= TTMSGHDR;
fp->fr_dat += TTMSGHDR;
rxup = umbuf_new();
rxup->m_tag = pvmget32(cp);
rxup->m_enc = pvmget32(cp + 4);
rxup->m_wid = pvmget32(cp + 8);
rxup->m_crc = pvmget32(cp + 12);
rxup->m_src = src;
LISTPUTBEFORE(rxfrag, rxup, m_link, m_rlink);
}
/* locate frag's message */
for (rxup = rxfrag->m_link; rxup != rxfrag; rxup = rxup->m_link)
if (rxup->m_src == src)
break;
if (rxup == rxfrag) { /* uh oh, no message for it */
pvmlogerror("mroute() frag with no message\n");
fr_unref(fp);
} else {
LISTPUTBEFORE(rxup->m_frag, fp, fr_link, fr_rlink);
rxup->m_len += fp->fr_len;
/*
* if end of message, move to rxlist and count it
*/
if (ff & FFEOM) {
LISTDELETE(rxup, m_link, m_rlink);
#ifdef MCHECKSUM
if (rxup->m_crc != umbuf_crc(rxup)) {
sprintf(pvmtxt,
"mxinput() message src t%x cod %d bad checksum\n",
rxup->m_src, rxup->m_tag);
pvmlogerror(pvmtxt);
umbuf_free(rxup);
} else {
#endif
pmsg_setenc(rxup, rxup->m_enc);
mesg_input(rxup);
gotem++;
#ifdef MCHECKSUM
}
#endif
}
}
}
} while (block && !gotem);
return gotem;
}
/* sends a frag to another process */
int
node_send(txup, txfp, dtid, code)
struct pmsg *txup; /* tx message or null */
struct frag *txfp; /* cur tx frag or null */
int dtid; /* dest */
int code; /* type code */
{
int mask = pvmtidhmask; /* host */
char *txcp = 0; /* point to remainder of txfp */
int txtogo = 0; /* len of txfp */
long node; /* destination node */
long ptype; /* destination process type */
int i;
int ff;
char dummy[TDFRAGHDR+TTMSGHDR]; /* for inplace data */
#ifdef IMA_SP2MPI
MPI_Status mpista;
int mpiflag = 0;
#endif
#if defined(IMA_PGON) || defined(IMA_CM5) || defined(IMA_SP2MPI)
mask |= pvmtidtmask; /* process type */
#endif
if (TIDISNODE(dtid) && (dtid & mask) == (pvmmytid & mask)) {
node = dtid & pvmtidnmask;
ptype = pvmmyptype; /* send to node directly */
} else {
node = pvmhostnode;
ptype = PVMDPTYPE; /* send to pvmd first */
}
do {
/* check any pending sends; free data if send is completed */
if (nextsmhd == NUMSMHD)
nextsmhd = 0;
i = nextsmhd;
while (outfrags[i].tf_mhdl != PVMMHDNULL &&
ASYNCDONE(outfrags[i].tf_mhdl) > 0) {
fr_unref(outfrags[i].tf_fp);
#ifdef IMA_CM5
CMMD_free_mcb(outfrags[i].tf_mhdl);
#endif
outfrags[i++].tf_mhdl = PVMMHDNULL;
}
if (outfrags[nextsmhd].tf_mhdl != PVMMHDNULL) {
if (pvmdebmask & PDMPACKET)
pvmlogerror("out of mid's?");
nextsmhd++; /* don't get stuck here; check out next mhd */
continue;
}
if (txfp->fr_u.dab) /* packed data */
txcp = txfp->fr_dat;
else /* inplace data */
txcp = dummy + sizeof(dummy);
txtogo = txfp->fr_len;
/*
* if this is first frag, prepend t-t header
*/
ff = 0;
if (txfp->fr_rlink == txup->m_frag) {
txcp -= TTMSGHDR;
txtogo += TTMSGHDR;
pvmput32(txcp, code);
pvmput32(txcp + 4, (txup->m_enc == 2 ? pvmmydsig : txup->m_enc));
pvmput32(txcp + 8, txup->m_wid);
#ifdef MCHECKSUM
pvmput32(txcp + 12, umbuf_crc(txup));
#else
pvmput32(txcp + 12, 0);
#endif
ff = FFSOM;
}
if (txfp->fr_link == txup->m_frag)
ff |= FFEOM;
/*
* prepend t-d header
*/
txcp -= TDFRAGHDR;
pvmput32(txcp, dtid);
pvmput32(txcp + 4, pvmmytid);
pvmput32(txcp + 8, txtogo);
pvmput32(txcp + 12, 0); /* to keep putrify happy */
pvmput8(txcp + 12, ff);
txtogo += TDFRAGHDR;
if (pvmdebmask & PDMPACKET) {
sprintf(pvmtxt, "node_send() dst t%x len %d ptype=%ld node=%ld\n",
dtid, txfp->fr_len, ptype, node);
pvmlogerror(pvmtxt);
}
if (!txfp->fr_u.dab) { /* inplace data */
if ((mpierrcode = PVMCSEND(PMTPACK, txcp, txtogo- txfp->fr_len,
node, ptype))
#if defined(IMA_PGON) || defined(IMA_CM5) || defined(IMA_I860)
&& (mpierrcode < 0)
#endif
) {
pvmlogperror("node_send() csend header");
return PvmSysErr;
}
#ifdef IMA_I860
/* crecv can't select msg with node #; it can only select type */
if (_csend(pvmmyvnode, txfp->fr_dat, txfp->fr_len, node, 0) < 0) {
pvmlogperror("node_send() csend inplace data");
return PvmSysErr;
}
#else
/* here the type field is reserved for psend/precv */
if ((mpierrcode =
PVMCSEND(PMTPACK, txfp->fr_dat, txfp->fr_len, node, ptype))
#if defined(IMA_PGON) || defined(IMA_CM5) || defined(IMA_I860)
&& (mpierrcode < 0)
#endif
) {
pvmlogperror("node_send() csend inplace data");
return PvmSysErr;
}
#endif
} else {
if (node != pvmhostnode) {
#ifdef IMA_SP2MPI
if (mpierrcode = MPI_Isend(txcp, txtogo, MPI_BYTE, node,
PMTPACK, MPI_COMM_WORLD, &outfrags[nextsmhd].tf_mhdl)) {
#else
if ((outfrags[nextsmhd].tf_mhdl =
ASYNCSEND(PMTPACK, txcp, txtogo, node, ptype)) < 0) {
#endif
pvmlogperror("node_send() asyncsend");
return PvmSysErr;
}
/* don't free data 'til frag's sent */
txfp->fr_u.ref++;
outfrags[nextsmhd++].tf_fp = txfp;
} else {
/*
* In multicast, the fragment header may get overwritten
* if we use ASYNCSEND. This is OK for node-node send,
* because the dst field is not used. But pvmd relies on
* this value to deliever the packet.
*/
if ((mpierrcode = PVMCSEND(PMTPACK, txcp, txtogo, node, ptype))
#if defined(IMA_PGON) || defined(IMA_CM5) || defined(IMA_I860)
&& (mpierrcode < 0)
#endif
) {
pvmlogperror("node_send() csend");
return PvmSysErr;
}
}
}
txfp = txfp->fr_link;
if (!txfp->fr_buf)
txfp = 0;
} while (txfp);
}
int
node_mcast(mid, dtid, code)
int mid; /* message id */
int dtid; /* destination */
int code; /* type */
{
int i;
long count = 0;
int cc = 0;
static int *tids; /* intended recipients of multicast message */
static int ntask; /* number of tids */
int sbf;
int tmp;
static struct timeval ztv = { 0, 0 };
#if defined(IMA_PGON)
long *nodes;
int mask = pvmtidhmask; /* host */
struct pmsg *txup; /* tx message or null */
struct frag *txfp = 0; /* cur tx frag or null */
int txtogo = 0; /* len of txfp */
int ff;
char *txcp = 0; /* point to remainder of txfp */
int len = 0; /* len of txfp */
char dummy[TDFRAGHDR+TTMSGHDR]; /* for inplace data */
#endif /*defined(IMA_PGON)*/
/* intercept multicast info */
if (dtid == TIDPVMD) {
pvm_setrbuf(mid);
pvm_upkint(&ntask, 1, 1);
tids = TALLOC(ntask, int, "tids");
pvm_upkint(tids, ntask, 1);
sbf = pvm_setsbuf(pvm_mkbuf(PvmDataFoo));
tmp = TIDGID;
pvm_pkint(&tmp, 1, 1);
pvm_setrbuf(pvm_setsbuf(sbf));
return 0;
}
#if defined(IMA_PGON)
/* #if defined(IMA_PGON) */
mask |= pvmtidtmask; /* process type */
/* #endif */
nodes = TALLOC(ntask, long, "nodes");
for (i = 0; i < ntask; i++) {
if (TIDISNODE(tids[i]) && (tids[i] & mask) == (pvmmytid & mask))
nodes[count++] = tids[i] & pvmtidnmask;
else
cc = mroute(mid, tids[i], code, &ztv);
}
if (count) {
if (txup = midtobuf(mid)) {
txfp = txup->m_frag->fr_link;
txfp = txfp->fr_buf ? txfp : 0;
}
while (txfp) {
if (txfp->fr_u.dab) /* packed data */
txcp = txfp->fr_dat;
else /* inplace data */
txcp = dummy + sizeof(dummy);
txtogo = txfp->fr_len;
ff = 0;
if (txfp->fr_rlink == txup->m_frag) {
txcp -= TTMSGHDR;
txtogo += TTMSGHDR;
pvmput32(txcp, code);
pvmput32(txcp + 4,
(txup->m_enc == 2 ? pvmmydsig : txup->m_enc));
pvmput32(txcp + 8, txup->m_wid);
#ifdef MCHECKSUM
pvmput32(txcp + 12, umbuf_crc(txup));
#else
pvmput32(txcp + 12, 0);
#endif
ff = FFSOM;
}
if (txfp->fr_link == txup->m_frag)
ff |= FFEOM;
txcp -= TDFRAGHDR;
pvmput32(txcp, dtid);
pvmput32(txcp + 4, pvmmytid);
pvmput32(txcp + 8, txtogo);
pvmput32(txcp + 12, 0); /* to keep putrify happy */
pvmput8(txcp + 12, ff);
txtogo += TDFRAGHDR;
if (pvmdebmask & PDMPACKET) {
sprintf(pvmtxt, "node_mcast() len %d\n", txfp->fr_len);
pvmlogerror(pvmtxt);
}
if (count == numnodes() - 1) {
if (!txfp->fr_u.dab) { /* inplace data */
if (_csend(PMTPACK, txcp, txtogo - txfp->fr_len, -1,
pvmmyptype) < 0) {
pvmlogperror("node_mcast() csend header");
return PvmSysErr;
}
if (_csend(PMTPACK, txfp->fr_dat, txfp->fr_len, -1,
pvmmyptype) < 0){
pvmlogperror("node_mcast() csend inplace data");
return PvmSysErr;
}
} else { /* packed */
if (_csend(PMTPACK, txcp, (long)txtogo, -1, pvmmyptype) < 0) {
pvmlogperror("node_mcast() csend");
return PvmSysErr;
}
}
} else {
if (!txfp->fr_u.dab) { /* inplace data */
if (_gsendx(PMTPACK, txcp, txtogo - txfp->fr_len, nodes,
count) < 0) {
pvmlogperror("node_mcast() gsendx header");
return PvmSysErr;
}
if (_gsendx(PMTPACK, txfp->fr_dat, txfp->fr_len, nodes,
count) < 0) {
pvmlogperror("node_send() gsendx inplace data");
return PvmSysErr;
}
} else {
if (_gsendx(PMTPACK, txcp, (long)txtogo, nodes, count) < 0){
pvmlogperror("node_mcast() gsendx");
return PvmSysErr;
}
}
}
txfp = txfp->fr_link;
if (!txfp->fr_buf)
txfp = 0;
}
}
PVM_FREE(nodes);
#endif /*defined(IMA_PGON)*/
#if defined(IMA_CM5) || defined(IMA_I860) || defined(IMA_SP2MPI)
for (i = 0; i < ntask; i++)
cc = mroute(mid, tids[i], code, &ztv);
#endif
PVM_FREE(tids);
ntask = 0;
return cc;
}
/* msendrecv()
*
* Single op to send a system message (usually to our pvmd) and get
* the reply.
* Returns message handle or negative if error.
*/
int
msendrecv(other, code)
int other; /* dst, src tid */
int code; /* message code */
{
int cc;
struct pmsg *up;
if (!pvmsbufmid)
return PvmNoBuf;
/* send code to other */
if (pvmdebmask & PDMMESSAGE) {
sprintf(pvmtxt, "msendrecv() to t%x code %d\n", other, code);
pvmlogerror(pvmtxt);
}
if ((cc = mroute(pvmsbuf->m_mid, other, code, (struct timeval *)0)) < 0)
return cc;
if (code == TM_MCA) /* for node_mcast() */
return 1;
/* recv code from other */
for (up = pvmrxlist->m_link; 1; up = up->m_link) {
if (up == pvmrxlist) {
up = up->m_rlink;
if ((cc = mroute(0, 0, 0, (struct timeval *)0)) < 0)
return cc;
up = up->m_link;
}
if (pvmdebmask & PDMMESSAGE) {
sprintf(pvmtxt, "msendrecv() cmp from t%x code %d\n",
up->m_src, up->m_tag);
pvmlogerror(pvmtxt);
}
if (up->m_src == other && up->m_tag == code)
break;
}
LISTDELETE(up, m_link, m_rlink);
if (pvmrbuf)
umbuf_free(pvmrbuf);
pvmrbuf = 0;
if (cc = pvm_setrbuf(up->m_mid))
return cc;
return up->m_mid;
}
#ifdef IMA_SP2MPI
/* Relay messages between pvmd and node tasks. */
void relay(dsock)
int dsock; /* pvmd socket */
{
fd_set wrk_rfds, wrk_wfds, rfds, wfds;
int nfds;
struct timeval tout;
struct frag *frpvmd = 0; /* (small) frag from pvmd */
struct frag *topvmd = 0; /* (big) frag to pvmd */
struct frag *frtask = 0; /* (big) frag from task */
struct frag *totask; /* (small) frag being sent to task */
char *txcp = 0; /* point to remainder of topvmd */
int txtogo = 0; /* len of remainder of topvmd */
int toread; /* number of bytes to be read from pvmd */
int frtogo; /* len of remainder of a fragment */
int topvmd_dst; /* dst of fragment being sent to pvmd */
int topvmd_src; /* src of fragment being sent to pvmd */
int len;
int topvmd_ff; /* ff of fragment being sent to pvmd */
int dst; /* dst of fragment being sent to node */
int node; /* node number */
MPI_Request rmhd; /* msg IDs returned by async recv */
int n;
char *cp;
int err;
MPI_Status sta; /* info on pending message */
int dummy;
int flag; /* MPI_Test result */
struct frag *hdr;
nfds = dsock + 1;
frtask = fr_new(MAXFRAGSIZE);
if (err = MPI_Irecv(frtask->fr_dat, frtask->fr_max, MPI_BYTE,
MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &rmhd)) {
MPI_Error_string(err, pvmtxt, &dummy);
pvmlogerror(pvmtxt);
pvmlogerror("relay() MPI_Irecv");
pvm_exit();
MPI_Finalize();
exit(9);
}
FD_ZERO(&wrk_rfds);
FD_ZERO(&wrk_wfds);
FD_SET(dsock, &wrk_rfds);
while (1) { /* ferry messages between pvmd and tasks */
tout.tv_sec = 0;
tout.tv_usec = 0;
if (!topvmd && !MPI_Test(&rmhd, &flag, &sta) && flag) {
topvmd = frtask;
MPI_Get_count(&sta, MPI_BYTE, &txtogo); /* w/ header */
/*
sprintf(pvmtxt, "relay() recv %d", txtogo);
pvmlogerror(pvmtxt);
*/
txcp = topvmd->fr_dat;
if ((len = pvmget32(txcp + 8) + TDFRAGHDR) != txtogo) {
/* inplace data, head & body sent separately */
if (len > MAXFRAGSIZE) {
hdr = topvmd;
topvmd = fr_new(len);
BCOPY(hdr->fr_dat, topvmd->fr_dat, txtogo);
fr_unref(hdr);
txcp = topvmd->fr_dat;
}
MPI_Recv(topvmd->fr_dat + txtogo, len - txtogo, MPI_BYTE,
sta.MPI_SOURCE, PMTPACK, MPI_COMM_WORLD, &sta);
txtogo = len;
}
frtask = fr_new(MAXFRAGSIZE);
/* ready for the next message */
if (err = MPI_Irecv(frtask->fr_dat, frtask->fr_max, MPI_BYTE,
MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &rmhd)) {
MPI_Error_string(err, pvmtxt, &dummy);
pvmlogerror(pvmtxt);
pvmlogerror("relay() MPI_Irecv");
pvm_exit();
MPI_Finalize();
exit(10);
}
FD_SET(dsock, &wrk_wfds);
}
rfds = wrk_rfds;
wfds = wrk_wfds;
if (select(nfds, &rfds, &wfds, (fd_set*)0, &tout) == -1
&& errno != EINTR) {
pvmlogperror("relay() select");
pvm_exit();
MPI_Finalize();
exit(11);
}
if (FD_ISSET(dsock, &rfds)) {
if (!frpvmd) {
frpvmd = fr_new(MAXFRAGSIZE);
toread = TDFRAGHDR;
}
n = read(dsock, frpvmd->fr_dat + frpvmd->fr_len, toread);
/*
sprintf(pvmtxt, "relay() read %d", n);
pvmlogerror(pvmtxt);
*/
if (n == -1 && errno != EWOULDBLOCK && errno != EINTR) {
pvmlogperror("relay() read pvmd sock");
MPI_Finalize();
exit(12);
}
if (!n) {
/* pvmlogerror("relay() read EOF on pvmd sock\n"); */
MPI_Finalize();
exit(13);
}
if ((frpvmd->fr_len += n) < TDFRAGHDR) {
toread -= n;
continue;
}
n = TDFRAGHDR + pvmget32(frpvmd->fr_dat + 8); /* header + body */
if (frpvmd->fr_len < n) {
if (frpvmd->fr_max < n) { /* n > MAXFRAGSIZ */
hdr = frpvmd;
frpvmd = fr_new(n);
BCOPY(hdr->fr_dat, frpvmd->fr_dat, hdr->fr_len);
frpvmd->fr_len = hdr->fr_len;
fr_unref(hdr);
}
toread = n - frpvmd->fr_len;
continue;
}
dst = pvmget32(frpvmd->fr_dat);
node = dst & pvmtidnmask;
/* inform precv/recv of a packed message */
while (err =
MPI_Bsend(&node, 0, MPI_BYTE, node, PMTPACK, MPI_COMM_WORLD)) {
MPI_Error_string(err, pvmtxt, &dummy);
pvmlogerror(pvmtxt);
sprintf(pvmtxt, "relay() can't send null msg to t%x", dst);
pvmlogerror(pvmtxt);
}
while (err = MPI_Bsend(frpvmd->fr_dat, frpvmd->fr_len, MPI_BYTE,
node, PMTPACK, MPI_COMM_WORLD)) {
MPI_Error_string(err, pvmtxt, &dummy);
pvmlogerror(pvmtxt);
sprintf(pvmtxt, "relay() can't send to t%x", dst);
pvmlogerror(pvmtxt);
}
fr_unref(frpvmd);
frpvmd = 0;
/*
sprintf(pvmtxt, "relay() sent %d to node %d\n", frpvmd->fr_len, (dst & pvmtidnmask));
pvmlogerror(pvmtxt);
*/
}
if (FD_ISSET(dsock, &wfds)) {
n = write(dsock, txcp, txtogo);
/*
sprintf(pvmtxt, "relay() wrote %d to pvmd\n", n);
pvmlogerror(pvmtxt);
*/
if (n == -1 && errno != EWOULDBLOCK && errno != EINTR) {
pvmlogperror("relay() write pvmd sock");
MPI_Finalize();
exit(14);
}
if (n > 0 && (txtogo -= n) > 0)
txcp += n;
if (!txtogo) { /* entire message sent */
FD_CLR(dsock, &wrk_wfds);
fr_unref(topvmd);
topvmd = 0;
}
}
}
}
/* We're the "host" process. Connect to pvmd. */
void
pvmhost()
{
char *p;
int dsock; /* pvmd socket */
struct sockaddr_in dsadr; /* address of pvmd socket */
int n;
int pvminfo[SIZEHINFO]; /* ntask, hostpart, ptid, MTU, NDF */
char nullmsg[TDFRAGHDR+TTMSGHDR];
if (!(p = getenv("PVMSOCK"))) {
pvmlogerror("pvmhost() getenv() pvmd socket\n");
MPI_Finalize();
exit(2);
}
mpierrcode = 0;
if ((dsock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
pvmlogperror("pvmhost() socket");
MPI_Finalize();
exit(3);
}
BZERO((char*)&dsadr, sizeof(dsadr));
hex_inadport(p, &dsadr);
dsadr.sin_family = AF_INET;
n = sizeof(dsadr);
while (connect(dsock, (struct sockaddr*)&dsadr, n) == -1)
if (errno != EINTR) {
pvmlogperror("pvmhost() connect");
MPI_Finalize();
exit(4);
}
#ifndef NOSOCKOPT
n = 1;
if (setsockopt(dsock, IPPROTO_TCP, TCP_NODELAY, (char*)&n, sizeof(int))
== -1) {
pvmlogperror("pvmhost() setsockopt");
MPI_Finalize();
exit(5);
}
#endif
if (!(p = getenv("PVMEPID"))) {
pvmlogerror("pvmhost() getenv() pid\n");
MPI_Finalize();
exit(6);
}
pvmmyupid = atoi(p);
BZERO(nullmsg, TDFRAGHDR+TTMSGHDR);
pvmput32(nullmsg, TIDPVMD);
pvmput32(nullmsg + 4, pvmmyupid);
pvmput32(nullmsg + 8, TTMSGHDR);
pvmput32(nullmsg + 12, 0); /* to keep putrify happy */
pvmput8(nullmsg + 12, FFSOM|FFEOM);
if (write(dsock, nullmsg, TDFRAGHDR+TTMSGHDR) != TDFRAGHDR+TTMSGHDR
|| read(dsock, pvminfo, SIZEHINFO*sizeof(int)) != SIZEHINFO*sizeof(int)) {
pvmlogperror("pvmhost() write/read");
MPI_Finalize();
exit(8);
}
MPI_Bcast(pvminfo, SIZEHINFO, MPI_INT, pvmhostnode, MPI_COMM_WORLD);
(void)relay(dsock);
}
#endif /*IMA_SP2MPI*/
/* pvmbeatask()
*
* Initialize libpvm, config process as a task.
* This is called as the first step of each libpvm function so no
* explicit initialization is required.
*
* Returns 0 if okay, else error code.
*/
int
pvmbeatask()
{
int pvminfo[SIZEHINFO]; /* proto, hostpart, ptid, MTU, NDF */
long rmid = -1; /* msg ID returned by irecv() */
int i;
int cc;
int ac = 0;
char *p;
struct pvmminfo minfo;
int n;
#ifdef IMA_SP2MPI
char *msgbuf; /* buffer for Bsend */
int msgbufsiz;
#endif
int need_trcinfo = 0;
int new_tracer = 0;
char tmask[ 2 * TEV_MASK_LENGTH ];
int tbuf, topt;
int mid, rbf;
TEVDECLS
if (pvmmytid != -1)
return 0;
TEV_EXCLUSIVE;
if (p = getenv("PVMTASKDEBUG")) { /* read the debug mask */
pvmdebmask = pvmxtoi(p);
if (pvmdebmask) {
sprintf(pvmtxt,"task debug mask is 0x%x\n", pvmdebmask);
pvmlogerror(pvmtxt);
}
}
#if defined(IMA_PGON) || defined(IMA_I860)
if ((rmid = _irecv(PMTHOST, pvminfo, sizeof(pvminfo))) < 0) {
pvmlogperror("beatask() recv pvminfo");
return PvmSysErr;
}
if ((pvmhostnode = _myhost()) < 0)
pvmlogperror("beatask() no controlling process");
#endif
if ((pvm_useruid = getuid()) == -1) {
pvmlogerror("can't getuid()\n");
return PvmSysErr;
}
#ifdef IMA_CM5
CMMD_enable_host();
CMMD_receive_bc_from_host(pvminfo, sizeof(pvminfo));
CMMD_reset_partition_size(pvminfo[0]);
if ((pvmmynode = CMMD_self_address()) >= pvminfo[0])
exit(0); /* no task to spawn on this node */
CMMD_fset_io_mode(stdout, CMMD_independent);
CMMD_fset_io_mode(stderr, CMMD_independent);
pvmhostnode = CMMD_host_node();
#endif
#ifdef IMA_SP2MPI
MPI_Init(&ac, NULL);
MPI_Comm_rank(MPI_COMM_WORLD, &pvmmynode);
MPI_Comm_size(MPI_COMM_WORLD, &pvmhostnode);
pvmhostnode--; /* host is last process in group */
if (!(p = getenv("PVMBUFSIZE")) || !(msgbufsiz = strtol(p, (char**)0, 0)))
msgbufsiz = MPIBUFSIZ;
if (!(msgbuf = malloc(msgbufsiz)))
pvmlogerror("relay() out of memory");
MPI_Buffer_attach(msgbuf, msgbufsiz); /* used in psend or relay */
if (pvmmynode == pvmhostnode)
(void)pvmhost();
MPI_Bcast(pvminfo, SIZEHINFO, MPI_INT, pvmhostnode, MPI_COMM_WORLD);
#endif
pvmmyupid = getpid();
/*
sprintf(pvmtxt, "pvminfo: %d %d %d %d %d node=%d\n", pvminfo[0], pvminfo[1], pvminfo[2], pvminfo[3], pvminfo[4], pvmmynode);
pvmlogerror(pvmtxt);
fflush(stdout);
*/
#ifdef IMA_PGON
if ((pvmmyptype = myptype()) == INVALID_PTYPE)
pvmlogerror("beatask() no process type\n");
pvmmynode = _mynode();
#endif
#ifdef IMA_I860
pvmmynode = _mydirect();
pvmmyvnode = _mynode();
if (_csend(PMTPHYS, &pvmmynode, sizeof(int), pvmhostnode, PVMDPTYPE) < 0)
pvmlogperror("beatask() can't send to host");
#endif
/*
* initialize received-message list and fragment reassembly list
*/
rxfrag = TALLOC(1, struct pmsg, "umb");
BZERO((char*)rxfrag, sizeof(struct pmsg));
rxfrag->m_link = rxfrag->m_rlink = rxfrag;
pvmrxlist = TALLOC(1, struct pmsg, "umb");
BZERO((char*)pvmrxlist, sizeof(struct pmsg));
pvmrxlist->m_link = pvmrxlist->m_rlink = pvmrxlist;
#if defined(IMA_CM5) || defined(IMA_SP2MPI)
precvlist = TALLOC(1, struct pmsg, "umb");
BZERO((char*)precvlist, sizeof(struct pmsg));
precvlist->m_link = precvlist->m_rlink = precvlist;
#endif
for (i = 0; i < NUMSMHD; i++)
outfrags[i].tf_mhdl = PVMMHDNULL;
#if defined(IMA_PGON) || defined(IMA_I860)
_msgwait(rmid);
#endif
#if defined(IMA_PGON) || defined(IMA_I860) || defined(IMA_SP2MPI)
if (pvminfo[0] != TDPROTOCOL) {
sprintf(pvmtxt, "beatask() t-d protocol mismatch (%d/%d)\n",
TDPROTOCOL, pvminfo[0]);
pvmlogerror(pvmtxt);
return PvmSysErr;
}
#endif
n = 1;
mysetpart = pvminfo[n++];
pvmmyptid = pvminfo[n++];
pvmudpmtu = pvminfo[n++];
pvmmydsig = pvminfo[n++];
#if defined(IMA_PGON)
pvmpgonpartsize = pvminfo[n++];
#endif
if (!pvmtrc.outtid) {
pvmtrc.outtid = pvminfo[n++];
pvmtrc.outctx = pvminfo[n++];
pvmtrc.outtag = pvminfo[n++];
pvmctrc.outtid = pvmtrc.outtid;
pvmctrc.outctx = pvmtrc.outctx;
pvmctrc.outtag = pvmtrc.outtag;
}
if (!pvmtrc.trctid) {
pvmtrc.trctid = pvminfo[n++];
pvmtrc.trcctx = pvminfo[n++];
pvmtrc.trctag = pvminfo[n++];
pvmctrc.trctid = pvmtrc.trctid;
pvmctrc.trcctx = pvmtrc.trcctx;
pvmctrc.trctag = pvmtrc.trctag;
new_tracer++;
}
pvmmytid = mysetpart + pvmmynode;
pvmfrgsiz = pvmudpmtu;
if (p = getenv("PVMCTX"))
pvmmyctx = pvmstrtoi(p);
/* get trace mask from envar or zero it */
if ( (p = getenv("PVMTMASK")) ) {
if ( strlen(p) + 1 == TEV_MASK_LENGTH )
BCOPY(p, pvmtrc.tmask, TEV_MASK_LENGTH);
else
TEV_MASK_INIT(pvmtrc.tmask);
} else {
TEV_MASK_INIT(pvmtrc.tmask);
if ( new_tracer ) need_trcinfo++
}
BCOPY(pvmtrc.tmask, pvmctrc.tmask, TEV_MASK_LENGTH);
/* get trace buffering from envar */
if ((p = getenv("PVMTRCBUF")))
pvmtrc.trcbuf = atoi( p );
else {
pvmtrc.trcbuf = 0;
if ( new_tracer ) need_trcinfo++
}
pvmctrc.trcbuf = pvmtrc.trcbuf;
/* get trace options from envar */
if ((p = getenv("PVMTRCOPT")))
pvmtrc.trcopt = atoi( p );
else {
pvmtrc.trcopt = 0;
if ( new_tracer ) need_trcinfo++
}
pvmctrc.trcopt = pvmtrc.trcopt;
BZERO(&minfo, sizeof(minfo));
minfo.src = -1;
minfo.ctx = SYSCTX_TC;
minfo.tag = TC_CONREQ;
pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_conreq);
minfo.tag = TC_CONACK;
pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_conack);
minfo.tag = TC_TASKEXIT;
pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_taskexit);
minfo.tag = TC_NOOP;
pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_noop);
minfo.tag = TC_SETTRACE;
pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_settrace);
minfo.tag = TC_SETTRCBUF;
pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_settrcbuf);
minfo.tag = TC_SETTRCOPT;
pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_settrcopt);
minfo.tag = TC_SETTMASK;
pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_settmask);
minfo.tag = TC_SIBLINGS;
pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_siblings);
if ( need_trcinfo )
{
rbf = pvm_setrbuf( 0 );
if ( pvm_recvinfo( PVMTRACERCLASS, 0, PvmMboxDefault ) > 0 )
{
pvm_upkint(&trctid, 1, 1);
pvm_upkint(&trcctx, 1, 1);
pvm_upkint(&trctag, 1, 1);
pvm_upkint(&outctx, 1, 1); /* unused here */
pvm_upkint(&outtag, 1, 1); /* unused here */
pvm_upkstr(tmask);
pvm_upkint(&tbuf, 1, 1);
pvm_upkint(&topt, 1, 1);
if ( pvmtrc.trctid == trctid && pvmtrc.trcctx == trcctx
&& pvmtrc.trctag == trctag )
{
if ( strlen(tmask) + 1 == TEV_MASK_LENGTH ) {
BCOPY(tmask, pvmtrc.tmask, TEV_MASK_LENGTH);
BCOPY(pvmtrc.tmask, pvmctrc.tmask, TEV_MASK_LENGTH);
}
pvmtrc.trcbuf = tbuf;
pvmctrc.trcbuf = pvmtrc.trcbuf;
pvmtrc.trcopt = topt;
pvmctrc.trcopt = pvmtrc.trcopt;
}
pvm_freebuf(pvm_setrbuf(rbf));
}
else
pvm_setrbuf(rbf);
}
tev_init();
if (TEV_AMEXCL) {
TEV_ENDEXCL;
}
return 0;
}
int
pvmendtask()
{
#if defined(IMA_PGON) || defined(IMA_I860) || defined(IMA_SP2MPI)
int i;
#ifdef IMA_SP2MPI
MPI_Status mpista;
#endif
for (i = 0; i < NUMSMHD; i++)
if (outfrags[i].tf_mhdl != PVMMHDNULL)
ASYNCWAIT(outfrags[i].tf_mhdl);
#endif
if (pvmmytid != -1) {
pvmmytid = -1;
}
#ifdef IMA_CM5
CMMD_all_msgs_wait();
#endif
#ifdef IMA_SP2MPI
MPI_Finalize();
#endif
/* XXX free rxfrag and rxlist */
return 0;
}
void
check_for_exit(src)
int src;
{
}
/************************
** Libpvm Functions **
** **
************************/
int
pvm_getfds(fds) /* XXX this function kinda sucks */
int **fds; /* fd list return */
{
int cc;
cc = PvmNotImpl;
return (cc < 0 ? lpvmerr("pvm_getfds", cc) : cc);
}
/*
* receive a message for precv
* returns 1 if message is received, 0 if not.
* returns -1 on error
*/
int
mpprecv(tid, tag, cp, len, rtid, rtag, rlen)
int tid;
int tag;
void *cp;
int len;
int *rtid;
int *rtag;
int *rlen;
{
#if defined(IMA_PGON) || defined(IMA_CM5)
long info[8];
#endif
#ifdef IMA_SP2MPI
MPI_Status info;
int mpiflag = 0;
int mpisiz;
#endif
long node;
struct pmsg *up;
int cc;
node = (tid == -1) ? MPPANYNODE : tid & pvmtidnmask;
#if defined(IMA_CM5) || defined(IMA_SP2MPI)
for (up = precvlist->m_link; up != precvlist; up = up->m_link)
/* message picked up by psend */
if ((tag == -1 || tag == up->m_tag)
&& (tid == -1 || node == up->m_src)) {
struct frag *fp;
fp = up->m_frag->fr_rlink;
if (fp->fr_max < len)
len = fp->fr_max;
BCOPY(fp->fr_dat, cp, len);
if (rlen)
*rlen = len;
if (rtid)
*rtid = up->m_src + (pvmmytid & ~pvmtidnmask);
if (rtag)
*rtag = up->m_tag;
LISTDELETE(up, m_link, m_rlink);
if (pvmdebmask & PDMPACKET) {
sprintf(pvmtxt, "pvm_precv() task %x len %d tag %d\n",
up->m_src + (pvmmytid & ~pvmtidnmask),
rlen ? *rlen : fp->fr_max, rtag ? *rtag : up->m_tag);
pvmlogerror(pvmtxt);
}
umbuf_free(up);
return 1;
}
#endif
/*
* Msg routed from pvmd has type PMTPACK, so it may have already
* been picked up by the last pvm_recv. If so, we call pvm_recv
* to process it. Note we only need consider the case tid = -1,
* because msg psent by another node cannot have type PMTPACK.
*/
if (tid == -1) {
for (up = pvmrxlist->m_link; up != pvmrxlist; up = up->m_link)
if (tag == -1 || tag == up->m_tag)
return 0; /* go on to pvm_recv */
if (tag != -1)
/* wait for msg with tag or PMTPACK */
while (!MSGPROBE(MPPANYNODE, tag, info))
if (MSGPROBE(pvmhostnode, PMTPACK, info)) {
if ((cc = mroute(0, 0, 0, (struct timeval *)0)) < 0)
return cc;
for (up = pvmrxlist->m_link; up != pvmrxlist;
up = up->m_link)
if (tag == up->m_tag)
return 0; /* go on to pvm_recv */
}
}
/* 1) from another node: 1a) tid = -1; 1b) tid given; 2) tag = -1 */
#ifndef IMA_PGON
if (tag == -1)
tag = MPPANYTAG;
#endif
if ((mpierrcode =
PVMCRECV(node, tag, (char*)cp, len, pvmmyptype, info))
#if defined(IMA_PGON) || defined(IMA_CM5)
&& (mpierrcode < 0)
#endif
) {
pvmlogperror("precv() PVMCRECV");
return PvmSysErr;
}
if ((node = MSGSENDER(info)) == pvmhostnode)
/* got null msg; go on to pvm_recv */
return 0;
if (rlen)
*rlen = MSGSIZE(info);
if (rtid)
*rtid = node + (pvmmytid & ~pvmtidnmask);
if (rtag)
*rtag = MSGTAG(info);
if (pvmdebmask & PDMPACKET) {
sprintf(pvmtxt, "pvm_precv() task %x len %d tag %d\n",
node + (pvmmytid & ~pvmtidnmask), rlen ? *rlen : MSGSIZE(info),
rtag ? *rtag : MSGTAG(info));
pvmlogerror(pvmtxt);
}
return 1;
}
int
pvm_precv(tid, tag, cp, len, dt, rtid, rtag, rlen)
int tid;
int tag;
void *cp;
int len;
int dt;
int *rtid;
int *rtag;
int *rlen;
{
int nb, mc, src;
int rbf;
int cc = 0;
long ad;
TEV_DECLS
if (TEV_EXCLUSIVE) {
if (TEV_DO_TRACE(TEV_PRECV,TEV_EVENT_ENTRY)) {
TEV_PACK_INT( TEV_DID_RST, TEV_DATA_SCALAR, &tid, 1, 1 );
TEV_PACK_INT( TEV_DID_RMC, TEV_DATA_SCALAR, &tag, 1, 1 );
ad = (long)cp;
TEV_PACK_LONG( TEV_DID_PDA, TEV_DATA_SCALAR, &ad, 1, 1 );
TEV_PACK_INT( TEV_DID_PC, TEV_DATA_SCALAR, &len, 1, 1 );
TEV_PACK_INT( TEV_DID_PDT, TEV_DATA_SCALAR, &dt, 1, 1 );
TEV_FIN;
}
}
switch (dt) {
case PVM_BYTE:
len *= sizeof(char);
break;
case PVM_SHORT:
case PVM_USHORT:
len *= sizeof(short);
break;
case PVM_INT:
case PVM_UINT:
len *= sizeof(int);
break;
case PVM_LONG:
case PVM_ULONG:
len *= sizeof(long);
break;
case PVM_FLOAT:
len *= sizeof(float);
break;
case PVM_CPLX:
len *= sizeof(float) * 2;
break;
case PVM_DOUBLE:
len *= sizeof(double);
break;
case PVM_DCPLX:
len *= sizeof(double) * 2;
break;
case PVM_STR:
cc = PvmNotImpl;
break;
default:
cc = PvmBadParam;
break;
}
if (!cc) {
#if defined(IMA_PGON) || defined(IMA_CM5) || defined(IMA_SP2MPI)
int mask = pvmtidhmask | pvmtidtmask; /* same partition */
if (tid == -1 || (TIDISNODE(tid) && (tid & mask) == (pvmmytid & mask)))
/*
* 1) Don't know where msg is coming from.
* 2) Expect msg from a node in the same partition.
*/
if (cc = mpprecv(tid, tag, cp, len, &src, &mc, &nb)) {
/* 1) Whole msg recv'd. 2) Error */
if (rtid)
*rtid = src;
if (rtag)
*rtag = mc;
if (rlen)
*rlen = nb;
goto done;
}
#endif
rbf = pvm_setrbuf(0);
cc = pvm_recv(tid, tag);
if (cc > 0) {
pvm_bufinfo(cc, &nb, &mc, &src);
if (rlen)
*rlen = nb;
if (nb < len)
len = nb;
if (rtag)
*rtag = mc;
if (rtid)
*rtid = src;
pvm_upkbyte((char *)cp, len, 1);
pvm_freebuf(cc);
cc = 0;
}
pvm_setrbuf(rbf);
}
done:
if (TEV_AMEXCL) {
if (TEV_DO_TRACE(TEV_PRECV,TEV_EVENT_EXIT)) {
TEV_PACK_INT( TEV_DID_CC, TEV_DATA_SCALAR, &cc, 1, 1 );
if ( cc < 0 )
nb = mc = src = -1;
TEV_PACK_INT( TEV_DID_MNB, TEV_DATA_SCALAR, &nb, 1, 1 );
TEV_PACK_INT( TEV_DID_MC, TEV_DATA_SCALAR, &mc, 1, 1 );
TEV_PACK_INT( TEV_DID_SRC, TEV_DATA_SCALAR, &src, 1, 1 );
TEV_FIN;
}
TEV_ENDEXCL;
}
if (cc < 0)
lpvmerr("pvm_precv", cc);
return cc;
}
int
pvm_psend(tid, tag, cp, len, dt)
int tid;
int tag;
void *cp;
int len;
int dt;
{
int sbf;
int cc = 0;
long ad;
TEV_DECLS
if (TEV_EXCLUSIVE) {
if (TEV_DO_TRACE(TEV_PSEND,TEV_EVENT_ENTRY)) {
TEV_PACK_INT( TEV_DID_DST, TEV_DATA_SCALAR, &tid, 1, 1 );
TEV_PACK_INT( TEV_DID_MC, TEV_DATA_SCALAR, &tag, 1, 1 );
ad = (long)cp;
TEV_PACK_LONG( TEV_DID_PDA, TEV_DATA_SCALAR, &ad, 1, 1 );
TEV_PACK_INT( TEV_DID_PC, TEV_DATA_SCALAR, &len, 1, 1 );
TEV_PACK_INT( TEV_DID_PDT, TEV_DATA_SCALAR, &dt, 1, 1 );
TEV_FIN;
}
}
switch (dt) {
case PVM_BYTE:
len *= sizeof(char);
break;
case PVM_SHORT:
case PVM_USHORT:
len *= sizeof(short);
break;
case PVM_INT:
case PVM_UINT:
len *= sizeof(int);
break;
case PVM_LONG:
case PVM_ULONG:
len *= sizeof(long);
break;
case PVM_FLOAT:
len *= sizeof(float);
break;
case PVM_CPLX:
len *= sizeof(float) * 2;
break;
case PVM_DOUBLE:
len *= sizeof(double);
break;
case PVM_DCPLX:
len *= sizeof(double) * 2;
break;
case PVM_STR:
cc = PvmNotImpl;
break;
default:
cc = PvmBadParam;
break;
}
if (!cc) {
#if defined(IMA_PGON) || defined(IMA_CM5) || defined(IMA_SP2MPI)
int mask = pvmtidhmask; /* host */
long node = tid & pvmtidnmask;
#ifdef IMA_CM5
CMMD_mcb mhdl;
int info;
#endif
#ifdef IMA_SP2MPI
MPI_Request mhdl;
MPI_Status info, mpista;
int mpiflag = 0;
int mpisiz;
#endif
mask |= pvmtidtmask; /* process type */
if (TIDISNODE(tid) && (tid & mask) == (pvmmytid & mask)) {
if (pvmdebmask & PDMPACKET) {
sprintf(pvmtxt,
"pvm_psend() dst t%x len %d ptype=%ld node=%ld\n",
tid, len, pvmmyptype, tid & pvmtidnmask);
pvmlogerror(pvmtxt);
}
#ifdef IMA_PGON
if (PVMCSEND(tag, cp, len, node, pvmmyptype) < 0)
#endif
#ifdef IMA_CM5
if ((mhdl = ASYNCSEND(tag, cp, len, node, pvmmyptype)) < 0)
#endif
#ifdef IMA_SP2MPI
if (mpierrcode = MPI_Isend(cp, len, MPI_BYTE, node,
tag, MPI_COMM_WORLD, &mhdl))
#endif
{
pvmlogperror("psend() PVMCSEND");
cc = PvmSysErr;
goto done;
}
#if defined(IMA_CM5) || defined(IMA_SP2MPI)
while (!ASYNCDONE(mhdl)) {
int rtag;
/*
* Psend is really a buffered send. We use async send to
* avoid the overhead of buffering (50%). The behavior of
* psend is now similar to RouteDirect: the sender blocks
* until the other party signals its intention to receive,
* but it can accept incoming messages in the meantime.
* This avoids deadlock when two tasks send messages to
* each other at the same time.
*/
if (MSGPROBE(MPPANYNODE, MPPANYTAG, info)) {
if ((rtag = MSGTAG(info)) != PMTPACK) {
struct pmsg *rxup;
struct frag *fp;
rxup = midtobuf(umbuf_new());
rxup->m_src = MSGSENDER(info);
rxup->m_tag = rtag;
fp = fr_new(MSGSIZE(info));
LISTPUTBEFORE(rxup->m_frag, fp, fr_link, fr_rlink);
if ((mpierrcode = PVMCRECV(rxup->m_src, rtag,
fp->fr_dat, fp->fr_max, pvmmyptype, info))
#ifdef IMA_CM5
&& (mpierrcode < 0)
#endif
) {
pvmlogperror("psend() PVMCRECV");
cc = PvmSysErr;
goto done;
}
LISTPUTBEFORE(precvlist, rxup, m_link, m_rlink);
} else {
if ((cc = mroute(0, 0, 0, (struct timeval *)0)) < 0)
goto done;
}
}
}
#ifdef IMA_CM5
CMMD_free_mcb(mhdl);
#endif
#endif /*defined(IMA_CM5) || defined(IMA_SP2MPI)*/
} else
#endif /*defined(IMA_PGON) || defined(IMA_CM5) || defined(IMA_SP2MPI)*/
{
sbf = pvm_setsbuf(pvm_mkbuf(PvmDataInPlace));
pvm_pkbyte((char *)cp, len, 1);
if ((cc = pvm_send(tid, tag)) > 0)
cc = 0;
pvm_freebuf(pvm_setsbuf(sbf));
}
}
done:
if (TEV_AMEXCL) {
if (TEV_DO_TRACE(TEV_PSEND,TEV_EVENT_EXIT)) {
TEV_PACK_INT( TEV_DID_CC, TEV_DATA_SCALAR, &cc, 1, 1 );
TEV_FIN;
}
TEV_ENDEXCL;
}
if (cc < 0)
lpvmerr("pvm_psend", cc);
return cc;
}